package org.example.supervise.supervisioncenter.holographicarchives

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.elasticsearch.index.query.QueryBuilders
import org.elasticsearch.search.builder.SearchSourceBuilder
import org.elasticsearch.spark.sql.EsSparkSQL
import org.example.constant.ApolloConst
import org.example.utils.MysqlUtil
import org.joda.time.DateTime
import org.slf4j.{Logger, LoggerFactory}

import java.sql.{CallableStatement, Connection, ResultSet}
import java.text.SimpleDateFormat

/**
 * 描述:监管端-监管中心-全息档案-企业安全生产信息
 * 脚本位置：/opt/software/project-jgd/project-script/supervise/supervisioncenter/EnterpriseSafetyProductionInfo.sh
 * datas：数据开发-离线作业-大数据服务平台-jgd-监管中心-全息档案-EnterpriseSafetyProductionInfo
 */
object EnterpriseSafetyProductionInfo {

  private val LOGGER: Logger = LoggerFactory.getLogger(this.getClass)
  private val MYSQL_TABLE: String = "zcov.enterprise_safety_production_info_day"
  private val WRITE_MODE: String = "append"

  def main(args: Array[String]): Unit = {
    val sparkSession: SparkSession = SparkSession
      .builder()
//      .appName("EnterpriseSafetyProductionInfo")
//      .master("local[*]")
      .config("es.nodes", ApolloConst.esNodes)
      .config("es.port", ApolloConst.esPort)
      .config("es.mapping.date.rich", "false") //数据时间格式数据异常处理
      .config("spark.sql.caseSensitive", "true") // 设置字段区分大小写
      .getOrCreate()

    val from: String = new DateTime().plusDays(-1).toString("yyyy-MM-dd")
    val to: String = new DateTime().toString("yyyy-MM-dd")
    deleteTableData(MYSQL_TABLE, from, to)

    // 企业基本信息
    val enterpriseSql =
      """
        |select
        | enterprise_code
        |from
        | zcov.basic_enterprise_info
        |""".stripMargin
    val vehSet: ResultSet = MysqlUtil.getMysqlQueryResult(enterpriseSql)
    val vehFrame: DataFrame = MysqlUtil
      .resultSetToDataframe(vehSet, sparkSession)
      .select("enterprise_code")
      .toDF("enterpriseCode")
    vehFrame.createOrReplaceTempView("enterpriseTab")

    //  获取风险总数和未处理风险数
    import sparkSession.implicits._
    val frame_Risk: DataFrame = getRiskNumAndNoRiskNum(sparkSession).filter(x => {
      x.getString(0) != null
    }).map(x => {
      (x.getString(0).split("_")(0), x.getString(1), x.getString(2), x.getLong(3),x.getLong(4))
    }).toDF("enterpriseCode", "riskTypeCode", "riskName", "cn1", "cn2")
    frame_Risk.createOrReplaceTempView("RiskTable")

    // 隐患数量 V3.2版本不需要隐患，固先注释
    /*val hiddenDangerSql =
      """
        |select
        | b.base_enterprise_code enterpriseCode,
        | date_format(a.gmt_create,'%Y-%m-%d') as dealDate,
        | count(1) as total,
        | count(case when a.deal_status in(3,4,9) then 1 else null end) as deal
        |from
        | (select dept_code,deal_status,gmt_create from hdsp_itms.supervise_process_info where date_format(gmt_create,'%Y-%m-%d')=date_format(date_sub(curdate(),interval 1 day),'%Y-%m-%d')) a
        |inner join
        | (select enterprise_code,base_enterprise_code from hdsp_itms.yz_enterprise_info_archive) b
        |on a.dept_code = b.enterprise_code COLLATE utf8mb4_unicode_ci
        |where b.base_enterprise_code is not null
        |group by a.dept_code
        |""".stripMargin
    val hiddenDangerSet: ResultSet = MysqlUtil.getMysqlQueryResult(hiddenDangerSql)
    val hiddenDangerFrame: DataFrame = MysqlUtil
      .resultSetToDataframe(hiddenDangerSet, sparkSession)
      .select("enterpriseCode", "dealDate", "total", "deal")
      .toDF("enterpriseCode", "dealDate", "total", "deal")
    hiddenDangerFrame.createOrReplaceTempView("hiddenDangerTable")
    */

    val resSql =
      s"""
        |select
        | a.enterpriseCode,
        | b.cn1,
        | b.cn2,
        | 0,
        | 0,
        | '$from',
        | b.riskTypeCode,
        | b.riskName
        |from
        | enterpriseTab a
        |left join RiskTable b
        | on a.enterpriseCode = b.enterpriseCode
        |""".stripMargin
    val resFrame: DataFrame = sparkSession
      .sql(resSql)
      .toDF("enterprise_code", "risk_count", "treat_risk_count", "hide_danger_count", "treat_hide_danger_count", "gmt_date", "risk_type_code", "risk_type_name")
    MysqlUtil.saveDataToMysql(resFrame, MYSQL_TABLE, WRITE_MODE)
    LOGGER.info("数据已经插入到mysql表：" + MYSQL_TABLE + "中")
    sparkSession.stop()
  }


  /**
   * 描述:删除指定时间范围内的表数据
   *
   * @param tableName mysql表名
   * @param from      开始时间
   * @param to        结束时间
   */
  private def deleteTableData(tableName: String, from: String, to: String): Unit = {
    val conn: Connection = MysqlUtil.getMysqlConn()
    val deleteTableSql = "delete from " + tableName + " where " + "gmt_date >='" + from + "' and gmt_date <'" + to + "'"
    val statement: CallableStatement = conn.prepareCall(deleteTableSql)
    statement.execute()
    statement.close()
    conn.close()
  }

  /**
   * 描述:获取风险总数和未处理风险数
   *
   * @param sparkSession SparkSession
   * @return 风险总数和未处理风险数组合而成的数组
   */
  def getRiskNumAndNoRiskNum(sparkSession: SparkSession): DataFrame = {
    val format = new SimpleDateFormat("yyyy-MM-dd 00:00:00")
    val time = new DateTime().toString("yyyy-MM-dd 00:00:00")
    val time1 = new DateTime().plusDays(-1).toString("yyyy-MM-dd 00:00:00")
    val to: Long = format.parse(time).getTime
    val from: Long = format.parse(time1).getTime
    var builder: SearchSourceBuilder = new SearchSourceBuilder() //用于查询的对象
    val queryBuilder = QueryBuilders.boolQuery() //添加查询规则
    queryBuilder.must(QueryBuilders.rangeQuery("riskDate").gte(from).lt(to)) //添加查询规则
    builder.query(queryBuilder) //查询规则加到查询对象中
    val esQuery = builder.toString //将查询对象转为string格式
    // spark读取es的数据转为DataFrame
    val riskDetailFrame = EsSparkSQL.esDF(sparkSession, "risk_detail_index/_doc", esQuery)
    // 将查询出的风险详情表注册为临时表方便后面用sql进行统计
    riskDetailFrame.createOrReplaceTempView("risk_detail")
    val sql =
      """
        |  select concat(vehicleEnterpriseCode,"_",vehicleEnterprise), -- 车辆所属企业
        |         riskTypeCode,-- 风险编码
        |         riskName,-- 风险名称
        |         count(1) as cn1,--风险总数
        |         count(case when processStatus='0' then 1 else null end) as cn2 --未处理风险
        |    from risk_detail
        |group by concat(vehicleEnterpriseCode,"_",vehicleEnterprise),riskTypeCode,riskName
      """.stripMargin
    val dataFrame: DataFrame = sparkSession.sql(sql)
    dataFrame
  }

}
